transform[U](t: Dataset[T] => Dataset[U]): Dataset[U]
Dataset Operators
You can group the set of all operators to use with Datasets per their target, i.e. the part of a Dataset they are applied to.
Beside the above operators, there are the following ones working with a Dataset as a whole.
| Operator | Description |
|---|---|
Converting a |
|
Repartitioning a |
|
Explain logical and physical plans of a |
|
Randomly split a |
|
Repartitioning a |
|
Converts a |
|
Transforms a |
|
createTempViewCommand Internal Operator
|
Caution
|
FIXME |
createGlobalTempView Operator
|
Caution
|
FIXME |
createOrReplaceTempView Operator
|
Caution
|
FIXME |
createTempView Operator
|
Caution
|
FIXME |
Transforming Datasets — transform Operator
transform applies t function to the source Dataset[T] to produce a result Dataset[U]. It is for chaining custom transformations.
val dataset = spark.range(5)
// Transformation t
import org.apache.spark.sql.Dataset
def withDoubled(longs: Dataset[java.lang.Long]) = longs.withColumn("doubled", 'id * 2)
scala> dataset.transform(withDoubled).show
+---+-------+
| id|doubled|
+---+-------+
| 0| 0|
| 1| 2|
| 2| 4|
| 3| 6|
| 4| 8|
+---+-------+
Internally, transform executes t function on the current Dataset[T].
Converting to DataFrame — toDF Methods
toDF(): DataFrame
toDF(colNames: String*): DataFrame
Internally, the empty-argument toDF creates a Dataset[Row] using the Dataset's SparkSession and QueryExecution with the encoder being RowEncoder.
|
Caution
|
FIXME Describe toDF(colNames: String*)
|
Converting to Dataset — as Method
|
Caution
|
FIXME |
Accessing DataFrameWriter — write Method
write: DataFrameWriter[T]
write method returns DataFrameWriter for records of type T.
import org.apache.spark.sql.{DataFrameWriter, Dataset}
val ints: Dataset[Int] = (0 to 5).toDS
val writer: DataFrameWriter[Int] = ints.write
Accessing DataStreamWriter — writeStream Method
writeStream: DataStreamWriter[T]
writeStream method returns DataStreamWriter for records of type T.
val papers = spark.readStream.text("papers").as[String]
import org.apache.spark.sql.streaming.DataStreamWriter
val writer: DataStreamWriter[String] = papers.writeStream
Display Records — show Methods
show(): Unit
show(numRows: Int): Unit
show(truncate: Boolean): Unit
show(numRows: Int, truncate: Boolean): Unit
show(numRows: Int, truncate: Int): Unit
|
Caution
|
FIXME |
Internally, show relays to a private showString to do the formatting. It turns the Dataset into a DataFrame (by calling toDF()) and takes first n records.
Taking First n Records — take Action
take(n: Int): Array[T]
take is an action on a Dataset that returns a collection of n records.
|
Warning
|
take loads all the data into the memory of the Spark application’s driver process and for a large n could result in OutOfMemoryError.
|
foreachPartition Action
foreachPartition(f: Iterator[T] => Unit): Unit
foreachPartition applies the f function to each partition of the Dataset.
case class Record(id: Int, city: String)
val ds = Seq(Record(0, "Warsaw"), Record(1, "London")).toDS
ds.foreachPartition { iter: Iterator[Record] => iter.foreach(println) }
|
Note
|
foreachPartition is used to save a DataFrame to a JDBC table (indirectly through JdbcUtils.saveTable) and ForeachSink.
|
mapPartitions Operator
mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]
mapPartitions returns a new Dataset (of type U) with the function func applied to each partition.
|
Caution
|
FIXME Example |
Creating Zero or More Records — flatMap Operator
flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U]
flatMap returns a new Dataset (of type U) with all records (of type T) mapped over using the function func and then flattening the results.
|
Note
|
flatMap can create new records. It deprecated explode.
|
final case class Sentence(id: Long, text: String)
val sentences = Seq(Sentence(0, "hello world"), Sentence(1, "witaj swiecie")).toDS
scala> sentences.flatMap(s => s.text.split("\\s+")).show
+-------+
| value|
+-------+
| hello|
| world|
| witaj|
|swiecie|
+-------+
Internally, flatMap calls mapPartitions with the partitions flatMap(ped).
Repartitioning Dataset with Shuffle Disabled — coalesce Operator
coalesce(numPartitions: Int): Dataset[T]
coalesce operator repartitions the Dataset to exactly numPartitions partitions.
Internally, coalesce creates a Repartition logical operator with shuffle disabled (which is marked as false in the below explain's output).
scala> spark.range(5).coalesce(1).explain(extended = true)
== Parsed Logical Plan ==
Repartition 1, false
+- Range (0, 5, step=1, splits=Some(8))
== Analyzed Logical Plan ==
id: bigint
Repartition 1, false
+- Range (0, 5, step=1, splits=Some(8))
== Optimized Logical Plan ==
Repartition 1, false
+- Range (0, 5, step=1, splits=Some(8))
== Physical Plan ==
Coalesce 1
+- *Range (0, 5, step=1, splits=Some(8))
Repartitioning Dataset with Shuffle Enabled — repartition Operators
repartition(numPartitions: Int): Dataset[T]
repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
repartition(partitionExprs: Column*): Dataset[T]
repartition operators repartition the Dataset to exactly numPartitions partitions or using partitionExprs expressions.
Internally, repartition creates a Repartition or RepartitionByExpression logical operators with shuffle enabled, respectively (which is marked as true in the below explain's output).
scala> spark.range(5).repartition(1).explain(extended = true)
== Parsed Logical Plan ==
Repartition 1, true
+- Range (0, 5, step=1, splits=Some(8))
== Analyzed Logical Plan ==
id: bigint
Repartition 1, true
+- Range (0, 5, step=1, splits=Some(8))
== Optimized Logical Plan ==
Repartition 1, true
+- Range (0, 5, step=1, splits=Some(8))
== Physical Plan ==
Exchange RoundRobinPartitioning(1)
+- *Range (0, 5, step=1, splits=Some(8))
|
Note
|
repartition methods correspond to SQL’s DISTRIBUTE BY or CLUSTER BY.
|
Projecting Columns — select Operators
select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1]
select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): Dataset[(U1, U2)]
select[U1, U2, U3](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
c3: TypedColumn[T, U3]): Dataset[(U1, U2, U3)]
select[U1, U2, U3, U4](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
c3: TypedColumn[T, U3],
c4: TypedColumn[T, U4]): Dataset[(U1, U2, U3, U4)]
select[U1, U2, U3, U4, U5](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
c3: TypedColumn[T, U3],
c4: TypedColumn[T, U4],
c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)]
|
Caution
|
FIXME |
filter Operator
|
Caution
|
FIXME |
where Operators
where(condition: Column): Dataset[T]
where(conditionExpr: String): Dataset[T]
where is a synonym for filter operator, i.e. it simply passes the parameters on to filter.
Projecting Columns using Expressions — selectExpr Operator
selectExpr(exprs: String*): DataFrame
selectExpr is like select, but accepts SQL expressions exprs.
val ds = spark.range(5)
scala> ds.selectExpr("rand() as random").show
16/04/14 23:16:06 INFO HiveSqlParser: Parsing command: rand() as random
+-------------------+
| random|
+-------------------+
| 0.887675894185651|
|0.36766085091074086|
| 0.2700020856675186|
| 0.1489033635529543|
| 0.5862990791950973|
+-------------------+
Internally, it executes select with every expression in exprs mapped to Column (using SparkSqlParser.parseExpression).
scala> ds.select(expr("rand() as random")).show
+------------------+
| random|
+------------------+
|0.5514319279894851|
|0.2876221510433741|
|0.4599999092045741|
|0.5708558868374893|
|0.6223314406247136|
+------------------+
|
Note
|
A new feature in Spark 2.0.0. |
Randomly Split Dataset — randomSplit Operators
randomSplit(weights: Array[Double]): Array[Dataset[T]]
randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
randomSplit randomly splits the Dataset per weights.
weights doubles should sum up to 1 and will be normalized if they do not.
You can define seed and if you don’t, a random seed will be used.
|
Note
|
It is used in TrainValidationSplit to split dataset into training and validation datasets. |
val ds = spark.range(10)
scala> ds.randomSplit(Array[Double](2, 3)).foreach(_.show)
+---+
| id|
+---+
| 0|
| 1|
| 2|
+---+
+---+
| id|
+---+
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
|
Note
|
A new feature in Spark 2.0.0. |
Explaining Logical and Physical Plans — explain Operator
explain(): Unit
explain(extended: Boolean): Unit
explain prints the logical and (with extended enabled) physical plans to the console. Use it to review the structured queries and optimizations applied.
|
Tip
|
If you are serious about query debugging you could also use the Debugging Query Execution facility. |
Internally, explain executes a ExplainCommand logical command.
scala> spark.range(10).explain(extended = true)
== Parsed Logical Plan ==
Range (0, 10, step=1, splits=Some(8))
== Analyzed Logical Plan ==
id: bigint
Range (0, 10, step=1, splits=Some(8))
== Optimized Logical Plan ==
Range (0, 10, step=1, splits=Some(8))
== Physical Plan ==
*Range (0, 10, step=1, splits=Some(8))
toJSON method
toJSON maps the content of Dataset to a Dataset of JSON strings.
|
Note
|
A new feature in Spark 2.0.0. |
scala> val ds = Seq("hello", "world", "foo bar").toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds.toJSON.show
+-------------------+
| value|
+-------------------+
| {"value":"hello"}|
| {"value":"world"}|
|{"value":"foo bar"}|
+-------------------+
Internally, toJSON grabs the RDD[InternalRow] (of the QueryExecution of the Dataset) and maps the records (per RDD partition) into JSON.
|
Note
|
toJSON uses Jackson’s JSON parser — jackson-module-scala.
|
Accessing Schema — schema Method
A Dataset has a schema.
schema: StructType
|
Tip
|
You may also use the following methods to learn about the schema:
|
Converting Dataset into RDD — rdd Attribute
rdd: RDD[T]
Whenever you are in need to convert a Dataset into a RDD, executing rdd method gives you the RDD of the proper input object type (not Row as in DataFrames) that sits behind the Dataset.
scala> val rdd = tokens.rdd
rdd: org.apache.spark.rdd.RDD[Token] = MapPartitionsRDD[11] at rdd at <console>:30
Internally, it looks ExpressionEncoder (for the Dataset) up and accesses the deserializer expression. That gives the DataType of the result of evaluating the expression.
|
Note
|
A deserializer expression is used to decode an InternalRow to an object of type T. See ExpressionEncoder.
|
It then executes a DeserializeToObject logical operator that will produce a RDD[InternalRow] that is converted into the proper RDD[T] using the DataType and T.
|
Note
|
It is a lazy operation that "produces" a RDD[T].
|
isStreaming Method
isStreaming returns true when Dataset contains StreamingRelation or StreamingExecutionRelation streaming sources.
|
Note
|
Streaming datasets are created using DataFrameReader.stream method (for StreamingRelation) and contain StreamingExecutionRelation after DataStreamWriter.start. |
val reader = spark.read
val helloStream = reader.stream("hello")
scala> helloStream.isStreaming
res9: Boolean = true
|
Note
|
A new feature in Spark 2.0.0. |
Is Dataset Local — isLocal method
isLocal: Boolean
isLocal is a flag that says whether operators like collect or take could be run locally, i.e. without using executors.
Internally, isLocal checks whether the logical query plan of a Dataset is LocalRelation.